/*
 * Copyright (c) 2021-2021, talkweb 拓维信息 www.talkweb.com.cn.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.talkweb.iot.mqtt.broker.cluster;

import com.talkweb.iot.mqtt.broker.enums.RedisKeys;
import com.talkweb.iot.mqtt.broker.kafka.KafkaTopics;
import com.talkweb.iot.mqtt.broker.service.IMqttClusterService;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.mica.core.utils.CharPool;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.tio.core.ChannelContext;
import org.tio.core.Node;

/**
 * mqtt 连接监听
 *
 * @author L.cm
 */
@RequiredArgsConstructor
public class RedisMqttConnectListener implements IMqttConnectStatusListener, SmartInitializingSingleton, DisposableBean {
	private final ApplicationContext context;
	private final MicaRedisCache redisCache;
	private IMqttClusterService clusterService;
	private String nodeName;

	@Override
	public void online(ChannelContext context, String clientId) {
		redisCache.sAdd(getRedisKey(), clientId);
		Message message = getMessage(context, MessageType.CONNECT, clientId);
		clusterService.sendToKafka(KafkaTopics.TOPIC_CLIENT_CONNECTED, message);
	}

	@Override
	public void offline(ChannelContext context, String clientId) {
		redisCache.sRem(getRedisKey(), clientId);
		Message message = getMessage(context, MessageType.DISCONNECT, clientId);
		clusterService.sendToKafka(KafkaTopics.TOPIC_CLIENT_DISCONNECTED, message);
	}

	/**
	 * 设备上下线存储，key 的值为 前缀:nodeName
	 *
	 * @return redis key
	 */
	private String getRedisKey() {
		return RedisKeys.CONNECT_STATUS.getKey(nodeName);
	}

	private Message getMessage(ChannelContext context, MessageType type, String clientId) {
		Message message = new Message();
		message.setNode(nodeName);
		message.setClientId(clientId);
		message.setMessageType(type);
		Node clientNode = context.getClientNode();
		message.setPeerHost(clientNode.getIp() + CharPool.COLON + clientNode.getPort());
		message.setTimestamp(System.currentTimeMillis());
		return message;
	}

	@Override
	public void afterSingletonsInstantiated() {
		this.clusterService = context.getBean(IMqttClusterService.class);
		MqttServerCreator serverCreator = context.getBean(MqttServerCreator.class);
		this.nodeName = serverCreator.getNodeName();
	}

	@Override
	public void destroy() throws Exception {
		// 停机时删除集合
		redisCache.del(getRedisKey());
	}
}
